package com.offcn.bigdata.spark.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * spark入门案例之scala的版本
  * master url含义及其可能的值：
  *    含义：就是当前spark作业执行的方式
  *    local：本地执行模式，作业在本地运行，不会提交到集群中去
  *        local     ：为当前的spark程序分配一个工作线程
  *        local[N]  : 为当前的spark程序分配N个工作线程
  *        local[*]  : 为当前的spark程序分配可用个工作线程
  *        local[N, R] ： 和前面三个相比较就多了一个R，R--retry，重试，当提交作业失败之后，会进行最多R次的重试
  *    standalone：基于spark自己的集群来执行作业
  *         分布式： spark://<ip>:<port>
  *         HA   ： spark://<ip1>:<port1>,<ip2>:<port2>
  *            部署模式deploy-mode：默认是client
  *                 client
  *                     sparkContext或者driver在提交spark作业的机器上面创建
  *                 cluster
  *                     sparkContext或者driver在spark集群中的worker节点上面创建
  *    yarn：基于yarn集群来执行作业
  *         部署模式deploy-mode: 默认是client
      *         client  :
  *                 sparkContext或者driver在提交spark作业的机器上面创建
      *         cluster :
  *                 sparkContext或者driver在yarn集群中的nodemanager节点上面创建
  */
object ScalaSparkWordCountApp {
    def main(args: Array[String]): Unit = {

        Logger.getLogger("org.spark_project").setLevel(Level.INFO)
        Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
        val conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName(s"${ScalaSparkWordCountApp.getClass.getSimpleName}")

        val sc = new SparkContext(conf)

        val lines: RDD[String] = sc.textFile("file:/E:/data/spark/hello.txt")

        println("partitions: " + lines.getNumPartitions)

        val words: RDD[String] = lines.flatMap(line => line.split("\\s+"))

        val pairs: RDD[(String, Int)] = words.map(word => (word, 1))

        val ret: RDD[(String, Int)] = pairs.reduceByKey((v1, v2) => v1 + v2)

        ret.foreach(t => println(t))

        sc.stop()
    }
}
